-
Notifications
You must be signed in to change notification settings - Fork 544
Add Modal orchestrator with step operator and orchestrator flavors #3733
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Conversation
Important Review skippedAuto reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the 📝 WalkthroughWalkthroughThis update introduces a new Modal orchestrator integration to ZenML, enabling orchestration of entire pipelines or individual steps on Modal's serverless cloud. It adds new configuration, settings, and utility modules, updates the Modal integration to support both orchestrator and step operator flavors, enhances documentation, and refactors related code for modularity and clarity. Changes
Sequence Diagram(s)sequenceDiagram
participant User
participant ZenML
participant ModalOrchestrator
participant ModalSandboxExecutor
participant Modal Cloud
User->>ZenML: Trigger pipeline run
ZenML->>ModalOrchestrator: Prepare deployment
ModalOrchestrator->>ModalSandboxExecutor: Setup execution (pipeline/step mode)
ModalSandboxExecutor->>Modal Cloud: Launch sandbox(es) with resources and image
Modal Cloud-->>ModalSandboxExecutor: Run pipeline/steps and stream logs
ModalSandboxExecutor->>ModalOrchestrator: Report completion or errors
ModalOrchestrator->>ZenML: Finalize run status
ZenML-->>User: Pipeline run results
Suggested reviewers
Poem
✨ Finishing touches🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
- Adds new Modal orchestrator flavor for serverless pipeline execution - Implements optimized execution modes: pipeline (default) and per_step - Supports GPU/CPU resource configuration with intelligent defaults - Features persistent apps with warm containers for fast execution - Includes comprehensive documentation and examples - Simplifies execution model by removing redundant single_function mode 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <[email protected]>
Documentation Link Check Results❌ Absolute links check failed |
✅ Branch tenant has been deployed! Access it at: https://staging.cloud.zenml.io/workspaces/feature-modal-orchestrator/projects |
elif ( | ||
log_age < 300 | ||
): # Only show logs from last 5 minutes | ||
# This log is recent enough to likely be ours | ||
logger.info(f"{log_msg}") | ||
# Else: skip old logs |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This bit seems maybe like it could use more attention. Also are you sure that we wouldn't have time zone mismatches here etc?
else: | ||
# Fallback to first step's resource settings if no pipeline-level resources | ||
if deployment.step_configurations: | ||
first_step = list(deployment.step_configurations.values())[0] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is the first step is unrepresentative, though?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should check / just make this really explicit in the docs that this is how we assume this
# Register the orchestrator with explicit credentials | ||
zenml orchestrator register <ORCHESTRATOR_NAME> \ | ||
--flavor=modal \ | ||
--token=<MODAL_TOKEN> \ | ||
--workspace=<MODAL_WORKSPACE> \ | ||
--synchronous=true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should use --token-id
and --token-secret
separately as per the code?
### Authentication with different environments | ||
|
||
For production deployments, you can specify different Modal environments: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe could have a little info box in this section (or maybe even above, linking down here) to say that you might want to have two different stacks, each associated with a different modal environment, one for prod and the other for development etc etc.
log_stream_active.set() | ||
start_time = time.time() | ||
|
||
def stream_logs() -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Function in function smells a bit wrong and also wondering if we should instead use their Python SDK to stream the logs? https://github.com/modal-labs/modal-client/blob/4177d0b994ac69e01ada7d7a96655c9dcaae570e/modal/cli/utils.py#L24
Possibly something for down the line, though the func-in-func seems off.
if TYPE_CHECKING: | ||
from zenml.integrations.modal.flavors.modal_orchestrator_flavor import ( | ||
ModalOrchestratorConfig, | ||
ModalOrchestratorSettings, | ||
) | ||
|
||
from zenml.integrations.modal.flavors.modal_orchestrator_flavor import ( | ||
ModalExecutionMode, | ||
) | ||
|
||
if TYPE_CHECKING: | ||
from zenml.models import PipelineDeploymentResponse, PipelineRunResponse | ||
from zenml.models.v2.core.pipeline_deployment import PipelineDeploymentBase |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
combine the 'if TYPE_CHECKING' parts?
|
||
The Modal orchestrator supports two execution modes: | ||
|
||
1. **`pipeline` (default)**: Runs the entire pipeline in a single Modal function for maximum speed and cost efficiency |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure I understand why this pipeline
option is max speed. Isn't it running everything sequentially in the same container? Wouldn't running things in parallel in separate Modal function calls run faster?
src/zenml/integrations/modal/flavors/modal_orchestrator_flavor.py
Outdated
Show resolved
Hide resolved
Using the ZenML `modal` integration, you can orchestrate and scale your ML pipelines on [Modal's](https://modal.com/) serverless cloud platform with minimal setup and maximum efficiency. | ||
|
||
The Modal orchestrator is designed for speed and cost-effectiveness, running entire pipelines in single serverless functions to minimize cold starts and optimize resource utilization. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe some representative screenshot of the Modal UI in here to make the docs a bit friendlier?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think its fine without
- Extract nested log streaming function into ModalLogStreamer class for better code organization - Remove unreliable timezone-based log filtering that could miss logs due to clock skew - Implement smarter resource fallback: use highest requirements across all steps instead of potentially unrepresentative first step - Add logging for resource selection decisions to improve debugging - Fix function-in-function code smell identified in PR review 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <[email protected]>
- Combine duplicate TYPE_CHECKING blocks into single import section - Improve import organization and reduce redundancy - Maintain all existing functionality while improving code structure 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <[email protected]>
- Import MODAL_ORCHESTRATOR_FLAVOR constant from central location to avoid duplication - Update requirements to modal>=1 after testing compatibility with both orchestrator and step operator - Remove unnecessary utils import that was only for mypy discovery - Maintain consistent import patterns across Modal integration files 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <[email protected]>
Based on PR review feedback: - Fix token authentication examples to use --token-id and --token-secret - Add "When NOT to use it" section with clear tradeoffs and alternatives - Add info boxes for environment separation best practices and cost implications - Document Modal vs Step Operator differences with usage recommendations - Add GPU base image requirements and CUDA compatibility warnings - Clarify execution modes: "pipeline" mode reduces overhead vs enables parallelism - Document resource fallback behavior and warming window defaults - Add container warming cost implications with specific guidance - Remove tracking pixel per review request - Improve overall documentation clarity and completeness 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <[email protected]>
max_parallelism: Maximum number of parallel sandboxes (for PER_STEP mode). | ||
synchronous: Wait for completion (True) or fire-and-forget (False). | ||
""" | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think having an option to specify the sandbox name would be nice?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
cloud: The cloud provider to use for the pipeline execution. | ||
modal_environment: The Modal environment to use for the pipeline execution. | ||
timeout: Maximum execution time in seconds (default 24h). | ||
execution_mode: Execution mode - PIPELINE (fastest) or PER_STEP (granular). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Needs more explanation IMO, also the naming I'm not sure about.
Both modes execute your pipeline, but PIPELINE
uses a single sandbox and runs sequentially, while PER_STEP
spins up a separate sandbox per step.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sandboxes dont have names and the app names are equal to the pipeline names
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this reply is on the wrong comment. What I meant was actually the app name. I would equate this with specifying a kubernetes namespace or pod name, both of which we allow users to specify but fallback to defaults like the pipeline name if no value is given.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@schustmi ok added
src/zenml/integrations/modal/step_operators/modal_step_operator.py
Outdated
Show resolved
Hide resolved
src/zenml/integrations/modal/orchestrators/modal_orchestrator.py
Outdated
Show resolved
Hide resolved
src/zenml/integrations/modal/orchestrators/modal_orchestrator.py
Outdated
Show resolved
Hide resolved
execution_mode = getattr( | ||
settings, "execution_mode", ModalExecutionMode.PIPELINE | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
execution_mode = getattr( | |
settings, "execution_mode", ModalExecutionMode.PIPELINE | |
) | |
execution_mode = settings.execution_mode |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
logger.error(f"Pipeline execution failed: {e}") | ||
raise | ||
|
||
logger.info("✅ Pipeline execution completed successfully") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is only true if the orchestrator is running in sync mode
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
orchestrator_run_id = str(uuid4()) | ||
environment[ENV_ZENML_MODAL_ORCHESTRATOR_RUN_ID] = orchestrator_run_id |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is no need to pass this to the entrypoint if it is simply a random UUID. In that case, we can simply generate it in the entrypoint itself.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
logger.info( | ||
f"🚀 Executing pipeline with Modal ({execution_mode.lower()} mode)" | ||
) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the execution mode is pipeline, we do not allow per-step images (as they cannot be used). I suggest you overwrite the get_docker_builds(...)
method as follows:
- Call the super implementation
- Check if there is any build configuration in the result that has
key="orchestrator"
and a non-Nonestep_name
. In that case, raise an error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
…r.py Co-authored-by: Michael Schuster <[email protected]>
Co-authored-by: Michael Schuster <[email protected]>
…l into feature/modal-orchestrator
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@schustmi Fixed your comments
) | ||
|
||
# Build Modal image | ||
zenml_image = self._build_modal_image(deployment, stack, environment) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment: Dont build each time, use metadata of build id to track whether this build is already in modal or not
logger.error(f"Pipeline execution failed: {e}") | ||
raise | ||
|
||
logger.info("✅ Pipeline execution completed successfully") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
execution_mode = getattr( | ||
settings, "execution_mode", ModalExecutionMode.PIPELINE | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
logger.info( | ||
f"🚀 Executing pipeline with Modal ({execution_mode.lower()} mode)" | ||
) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
orchestrator_run_id = str(uuid4()) | ||
environment[ENV_ZENML_MODAL_ORCHESTRATOR_RUN_ID] = orchestrator_run_id |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
src/zenml/integrations/modal/orchestrators/modal_orchestrator.py
Outdated
Show resolved
Hide resolved
cloud: The cloud provider to use for the pipeline execution. | ||
modal_environment: The Modal environment to use for the pipeline execution. | ||
timeout: Maximum execution time in seconds (default 24h). | ||
execution_mode: Execution mode - PIPELINE (fastest) or PER_STEP (granular). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sandboxes dont have names and the app names are equal to the pipeline names
raise ValueError( | ||
f"Per-step Docker settings are not supported in PIPELINE " | ||
f"execution mode. Step '{build.step_name}' has custom Docker " | ||
f"settings but will be ignored since the entire pipeline runs " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will not be ignored, but instead the run will fail right here with this exception
) | ||
|
||
# Determine if we should wait for completion | ||
synchronous = ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
synchronous = settings.synchronous
region=settings.region, | ||
app=app, | ||
timeout=86400, # 24h, the max Modal allows | ||
timeout=settings.timeout, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Feel free to ignore as part of this PR as well, but seems like this could also be refactored to use the same code that runs a step of a pipeline in a new sandbox?
token_id=orchestrator.config.token_id, | ||
token_secret=orchestrator.config.token_secret, | ||
workspace=orchestrator.config.workspace, | ||
environment=orchestrator.config.modal_environment, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The modal_environment
is not a config option and can therefore be overwritten using the settings. So I'm guessing we should probably fetch this from the pipeline settings instead?
token_id=self.config.token_id, | ||
token_secret=self.config.token_secret, | ||
workspace=self.config.workspace, | ||
environment=self.config.modal_environment, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as for the entrypoint, this should be fetched from the settings instead.
The configured Modal image. | ||
""" | ||
# Try to get existing image from the app | ||
image_name_key = f"zenml_image_{build_id}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How is this even supposed to work? This key is the same for all images in the build, which are certainly not referring to the same docker image.
logger.warning(f"Invalid memory {memory_mb}MB, ignoring.") | ||
memory_mb = None | ||
elif memory_mb < 128: # Less than 128MB seems too low | ||
logger.warning( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why would it not be intentional, the user explicitly specified this?
) | ||
|
||
# Build Modal image from the ZenML-built image | ||
logger.debug(f"Building Modal image from base: {image_name}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Duplicate
zenml_image = ( | ||
modal.Image.from_registry( | ||
image_name, secret=registry_secret | ||
).pip_install("modal") # Install Modal in the container |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
modal
is installed in the container, as part of the ZenML image build that happened for the stack.
def _build_modal_image_from_registry( | ||
image_name: str, | ||
stack: "Stack", | ||
environment: Optional[Dict[str, str]] = None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Didn't we say we don't build these into the image anymore?
If you do, your image reuse doesn't work, as these environment variables contain e.g. credentials to access the ZenML server. If they get reused, they point to potentially a wrong ZenML user, are credentials for the wrong user, ...
environment: Dict[str, str], | ||
settings: ModalOrchestratorSettings, | ||
shared_image_cache: Optional[Dict[str, Any]] = None, | ||
shared_app: Optional[Any] = None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No Any
run_id: The pipeline run ID. | ||
synchronous: Whether to wait for completion. | ||
""" | ||
logger.debug("Executing entire pipeline in single sandbox") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is called no matter which mode the orchestrator is running in.
command = ( | ||
ModalOrchestratorEntrypointConfiguration.get_entrypoint_command() | ||
) | ||
from uuid import UUID |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Import at the top
) | ||
|
||
# Submit the pipeline | ||
run_id = str(placeholder_run.id) if placeholder_run else None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're converting this to a string here, only to convert it back to a UUID in the executor.execute_pipeline
method.
environment_name=settings.modal_environment, | ||
) | ||
|
||
def _build_entrypoint_command( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
10 lines of code for a +
? Can be removed I thjink
else: | ||
return f"{build_id}_pipeline_{image_hash}" | ||
|
||
async def execute_pipeline( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is more like start_pipeline_sandbox
right?
def _prepare_modal_api_params( | ||
self, | ||
entrypoint_command: List[str], | ||
image: Any, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No Any
|
||
# Store the image ID for future caching after sandbox creation | ||
# The image should be hydrated after being used in sandbox creation | ||
await self._store_image_id(zenml_image) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should probably happen before the sync waiting?
""" | ||
try: | ||
# After sandbox creation, the image should be hydrated | ||
zenml_image.hydrate() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No zenml image has this method, probably because this is a modal image instead. No Any
type annotations.
# Execute pipeline sandbox | ||
await self._execute_sandbox( | ||
entrypoint_command=entrypoint_command, | ||
mode="PIPELINE", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually the mode here can be either pipeline or step, because this launches a sandbox that either
- runs the entire pipeline
- only orchestrator the steps
So we should rather pass the actual mode here, or nothing at all. This is anyway only used for tags.
Review the following changes in direct dependencies. Learn more about Socket for GitHub. |
Warning Review the following alerts detected in dependencies. According to your organization's Security Policy, it is recommended to resolve "Warn" alerts. Learn more about Socket for GitHub.
|
New Features
Documentation
Refactor
Tests
Breaking Changes
None - this is a new integration that doesn't affect existing functionality.
Dependencies
Note: This orchestrator follows the same patterns as other ZenML orchestrators
(GCP Vertex, Kubernetes) and integrates seamlessly with the existing ZenML
stack architecture.
Note: I also updated the step operator logic to unify it
Pre-requisites
Please ensure you have done the following:
develop
and the open PR is targetingdevelop
. If your branch wasn't based on develop read Contribution guide on rebasing branch to develop.Types of changes
Summary by CodeRabbit
New Features
Enhancements
Bug Fixes
Refactor
Documentation